from airflow.models.dag import DAG
from airflow.utils.dates import days_ago
import datetime
import pendulum
from dagen.operators import SSHOperator
from airflow.sensors.external_task import ExternalTaskMarker


pt= "{{ execution_date.in_tz('Asia/Shanghai').format('YYYYMMDD') }}"
local_tz = pendulum.timezone("Asia/Shanghai")
schedule_interval="0 1 * * *"
dag = DAG(
**{
'dag_id': "cdp_create_oneid",
'catchup': False,
'start_date': datetime.datetime(2021, month=10, day=21, tzinfo=local_tz),
'schedule_interval': schedule_interval,
'default_args':
    {
        'owner': 'Avris',
        'depends_on_past': False,
        'retries': 2,
        'email': ['zhujin@tesla.com'],
        'email_on_failure': True,
        'email_on_retry': False
    }
}
)


cmd= "/opt/bigdata/spark/current/bin/spark-submit --master yarn --deploy-mode cluster --executor-memory 6g --executor-cores 3 --num-executors 3 --queue batch  --class com.tesla.cdp.oneid.CreateOneID /mnt/app/eden-dwh/eden-dwh-cdp-oneid/lib/eden-dwh-cdp-oneid-1.0.jar"
ssh_hook_task = SSHOperator(
    dag=dag,
    **{
        'task_id': "t_cdp_create_oneid",
        'ssh_conn_id': 'eden_yarn_stg',
        'command': cmd
      }
)

check_one_id_par_task001 = ExternalTaskMarker(
    task_id="check_one_id_par_task001",
    external_dag_id="dws_cdp_agg_smp_sm_activity_td_v2_generate",
    external_task_id="check_one_id_task001",
)

check_one_id_par_task002 = ExternalTaskMarker(
    task_id="check_one_id_par_task002",
    external_dag_id="cdp_arrive_store_label_generate",
    external_task_id="check_one_id_task002",
)

check_one_id_par_task003 = ExternalTaskMarker(
    task_id="check_one_id_par_task003",
    external_dag_id="cdp_charge_detail_gen",
    external_task_id="check_one_id_task003",
)

check_one_id_par_task004 = ExternalTaskMarker(
    task_id="check_one_id_par_task004",
    external_dag_id="cdp_purpose_car_generate",
    external_task_id="check_one_id_task004",
)

check_one_id_par_task005 = ExternalTaskMarker(
    task_id="check_one_id_par_task005",
    external_dag_id="cdp_arrive_store_participate_activity_gen",
    external_task_id="check_one_id_task005",
)

ssh_hook_task>>[check_one_id_par_task001,check_one_id_par_task002,check_one_id_par_task003,check_one_id_par_task004,check_one_id_par_task005]
